package com.zhao.apitest.sink;

import com.zhao.apitest.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * @author ZhaoPan
 * @date 2022/3/29
 * @describe
 */
public class SinkTest4_JDBC {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //从文件读取数据
        DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");

        // 转换成SensorReading类型
        DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String s) throws Exception {
                String[] fields=s.split(",");
                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
            }
        });

        dataStream.addSink(new MyJDBCSink());
        env.execute();
    }

    // 实现自定义SinkFunction
    public static class MyJDBCSink extends RichSinkFunction<SensorReading> {
        //声明连接和预编译
        Connection connection=null;
        PreparedStatement insert=null;
        PreparedStatement update=null;
        @Override
        public void open(Configuration parameters) throws Exception {
            connection= DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456");
            insert=connection.prepareStatement("insert into sensor_temp (id,temp) values (?,?)");
            update=connection.prepareStatement("update sensor_temp set temp = ? where id = ? ");
        }

        // 每来一条数据，调用链接，执行sql
        @Override
        public void invoke(SensorReading value, Context context) throws Exception {
           // 直接执行更新
            update.setDouble(1,value.getTemperature());
            update.setString(2,value.getId());
            update.execute();
            if (update.getUpdateCount() == 0){
                insert.setString(1,value.getId());
                insert.setDouble(2,value.getTemperature());
                insert.execute();
            }
        }

        // 关闭连接流
        @Override
        public void close() throws Exception {
            connection.close();
            insert.close();
            update.close();
        }
    }
}
